################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from abc import ABC, abstractmethod
from typing import TypeVar, List, Generic

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Table

from pyflink.ml.core.param import WithParams

T = TypeVar('T')
E = TypeVar('E')
M = TypeVar('M')


class Stage(WithParams[T], ABC):
    """
    Base class for a node in a Pipeline or Graph. The interface is only a concept, and does not have
    any actual functionality. Its subclasses could be Estimator, Model, Transformer or AlgoOperator.
    No other classes should inherit this interface directly.

    Each stage is with parameters, and requires a public empty constructor for restoration.
    """

    @abstractmethod
    def save(self, path: str) -> None:
        """
        Saves this stage to the given path.
        """
        pass

    @classmethod
    @abstractmethod
    def load(cls, env: StreamExecutionEnvironment, path: str) -> T:
        """
        Instantiates a new stage instance based on the data read from the given path.
        """
        pass


class AlgoOperator(Stage[T], ABC):
    """
    An AlgoOperator takes a list of tables as inputs and produces a list of tables as results. It
    can be used to encode generic multi-input multi-output computation logic.
    """

    @abstractmethod
    def transform(self, *inputs: Table) -> List[Table]:
        """
        Applies the AlgoOperator on the given input tables and returns the result tables.

        :param inputs: A list of tables.
        :return: A list of tables.
        """
        pass


class Transformer(AlgoOperator[T], ABC):
    """
    A Transformer is an AlgoOperator with the semantic difference that it encodes the Transformation
    logic, such that a record in the output typically corresponds to one record in the input. In
    contrast, an AlgoOperator is a better fit to express aggregation logic where a record in the
    output could be computed from an arbitrary number of records in the input.
    """
    pass


class Model(Transformer[T], ABC):
    """
    A Model is typically generated by invoking :func:`~Estimator.fit`. A Model is a Transformer with
    the extra APIs to set and get model data.
    """

    def set_model_data(self, *inputs: Table) -> None:
        raise Exception("This operation is not supported.")

    def get_model_data(self) -> None:
        """
        Gets a list of tables representing the model data. Each table could be an unbounded stream
        of model data changes.

        :return: A list of tables.
        """
        raise Exception("This operation is not supported.")


class Estimator(Generic[E, M], Stage[E], ABC):
    """
    Estimators are responsible for training and generating Models.
    """

    def fit(self, *inputs: Table) -> Model[M]:
        """
        Trains on the given inputs and produces a Model.

        :param inputs: A list of tables.
        :return: A Model.
        """
        pass
